import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;

/**
 * Description
 * <p>
 * </p>
 * DATE 2018/9/24.
 *
 * @author liwei5.
 */
public class Startup {
    private static final String KAFKA_BROKER_IP_AND_PORT = "10.32.8.10:9092";

    public static void main(String[] args) {
        KafkaStreams streams = buildKafkaStreams();
        streams.start();
    }

    public static KafkaStreams buildKafkaStreams() {
        Properties props = buildProperties();

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> source = builder.stream("wc-input");
        KTable<String, Long> counts = source
                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                    @Override
                    public Iterable<String> apply(String value) {
                        System.out.println("收到值: " + value);
                        return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
                    }
                }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
                    @Override
                    public KeyValue<String, String> apply(String key, String value) {
                        System.out.println("收到值key: " + key + ", value: " + value);
                        return new KeyValue<>(value, value);
                    }
                })
                .groupByKey()
                .count("Counts");
        counts.print();
        KafkaStreams streams = new KafkaStreams(builder, props);
        return streams;
    }

    public static Properties buildProperties() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-demo");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_IP_AND_PORT);
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }
}
